package flinkstudy.stream.windows;

import flinkstudy.bo.EstateInfo;
import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import flinkstudy.stream.source.mock.DynamicUnlimitedData;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;

/**
 * 水印主要是为了 解决数据乱序和数据延迟的问题
 *
 * @author daocr
 * @date 2020/2/5
 * <p>
 * 周期性水印
 */
public class Watermark {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;


    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }

    /**
     * 周期性watermarks ：AssignerWithPeriodicWatermarks
     * <p>
     * 周期性生成 watermarks : 系统会周期性的把watermarks 插入流中
     * 周期默认时间为200 毫秒，可以使用 new ExecutionConfig().setAutoWatermarkInterval(long) 进行设置
     *
     * @see
     * @see AscendingTimestampExtractor 有序的数据
     * @see BoundedOutOfOrdernessTimestampExtractor 无序的数据
     * @see IngestionTimeExtractor
     */
    @Test
    public void withPeriodicWatermarks() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<EstateInfo> streamSource = env.addSource(new DynamicUnlimitedData());

        // 周期性水印
        SingleOutputStreamOperator<EstateInfo> estateInfoSingleOutputStreamOperator = streamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<EstateInfo>() {

            private Long currentMaxTimestamp = 0L;

            /**
             * 最大允许的乱序时间是10s
             */
            private Long maxOutOfOrderness = 10000L;

            private org.apache.flink.streaming.api.watermark.Watermark watermark;

            @Nullable
            @Override
            public org.apache.flink.streaming.api.watermark.Watermark getCurrentWatermark() {
                watermark = new org.apache.flink.streaming.api.watermark.Watermark(currentMaxTimestamp - maxOutOfOrderness);
                return watermark;
            }


            @Override
            public long extractTimestamp(EstateInfo element, long previousElementTimestamp) {

                long time = element.getCreateTime().getTime();

                currentMaxTimestamp = Math.max(time, currentMaxTimestamp);

                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                System.out.println("element date: " + element.getCreateTimeFormat()
                        + "\tcurrentMaxTimestamp :" + simpleDateFormat.format(new Date(currentMaxTimestamp))
                        + "\twatermark: " + Optional.ofNullable(watermark).map(e -> simpleDateFormat.format(e.getTimestamp())).orElse(null));

                return time;
            }
        });


        estateInfoSingleOutputStreamOperator.keyBy("cityId")
                //滚动窗口，统计过去10s的数据
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sum("price")
                .process(new ProcessFunction<EstateInfo, Tuple2<Integer, Long>>() {
                    @Override
                    public void processElement(EstateInfo value, Context ctx, Collector<Tuple2<Integer, Long>> out) throws Exception {
                        out.collect(Tuple2.of(value.getCityId(), value.getPrice()));
                    }
                })
                .print();


        env.execute();

    }


    /**
     * 没有时间周期可打算生成 watermarks：AssignerWithPunctuatedWatermarks
     * 应用场景：可以根据业务的数据的特性，来生成watermarks
     */
    @Test
    public void withPunctuatedWatermarks() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<EstateInfo> streamSource = env.addSource(new DynamicUnlimitedData());


        SingleOutputStreamOperator<EstateInfo> map = streamSource.map(new MapFunction<EstateInfo, EstateInfo>() {
            @Override
            public EstateInfo map(EstateInfo value) throws Exception {
                System.out.println("Watermark.map" + Thread.currentThread().getName()
                        + "\t value:" + value);
                return value;
            }
        });

        SingleOutputStreamOperator<EstateInfo> estateInfoSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<EstateInfo>() {

            /**
             * 最大允许的乱序时间是1s
             */
            private Long maxOutOfOrderness = 1000L;

            @Nullable
            @Override
            public org.apache.flink.streaming.api.watermark.Watermark checkAndGetNextWatermark(EstateInfo lastElement, long extractedTimestamp) {

                // 针对特殊业务数据生成 watermarks，达到控制watermarks 的作用
                if (lastElement.getCityId().equals(13)) {
                    return new org.apache.flink.streaming.api.watermark.Watermark(lastElement.getCreateTime().getTime() - maxOutOfOrderness);
                }

                return null;
            }

            @Override
            public long extractTimestamp(EstateInfo element, long previousElementTimestamp) {
                return element.getCreateTime().getTime();
            }
        });

        estateInfoSingleOutputStreamOperator.keyBy("cityId").timeWindow(Time.seconds(10))
                .aggregate(new AggregateFunction<EstateInfo, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> createAccumulator() {
                        return Tuple2.of(0L, 0L);
                    }

                    @Override
                    public Tuple2<Long, Long> add(EstateInfo value, Tuple2<Long, Long> accumulator) {
                        return Tuple2.of(accumulator.f0 +1,accumulator.f1 +value.getPrice() );
                    }

                    @Override
                    public Tuple2<Long, Long> getResult(Tuple2<Long, Long> accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                    }
                }, new ProcessWindowFunction<Tuple2<Long, Long>, String, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple2<Long, Long>> elements, Collector<String> out) throws Exception {
                        StringBuilder stringBuilder = new StringBuilder();
                        TimeWindow window = context.window();
                        long currentWatermark = context.currentWatermark();

                        stringBuilder.append("TimeWindow :" + window);
                        stringBuilder.append("\tcurrentWatermark" + currentWatermark);
                        stringBuilder.append("\tvalue" + tuple);
                        stringBuilder.append("\telements" + elements);

                        out.collect(stringBuilder.toString());

                    }
                }).print();


        env.execute();

    }


}
